Tennischläge klassifizieren - Computer Vision Projekt¶

image.png

Problemstellung¶

  • Bei diesem Projekt soll mithilfe von Computer Vision zwischen folgenden Tennisschlägen unterschieden werden: Vorhand, Rückhand und Aufschlag.

  • Dabei soll das trainierte Modell Muster erkennen, die auch auf einen neuen Datensatz angewendet werden können.

  • Für den ersten Datensatz haben wir einen Artikel gefunden, in dem bereits 500 Bilder pro Schlag klassifiziert wurden. Der Titel des Artikels ist "Motion Analysis of Tennis Strokes Using Pose Estimation".

    Link zum Artikel:
    https://www.sciencedirect.com/science/article/pii/S2352340924006322

  • Für den zweiten Datensatz haben wir selbst Bilder erhoben, indem wir diese Schläge gefilmt und anschliessend in Bilder umgewandelt haben.

  • Das Ziel dieses Tests ist es, ein Modell zu entwickeln, das nicht nur eine hohe Testgenauigkeit innerhalb eines Datensatzes erreicht, sondern auch über mehrere Datensätze hinweg Muster erkennen kann.

Bilder vorbereiten - Zuschneiden (cropping)¶

Visualisierung der Bilder¶

In [1]:
import os
import matplotlib.pyplot as plt
from PIL import Image

# Get the current working directory
base_path = os.getcwd()

# Define the relative path to the folder containing the categories
parent_folder_path = "/teamspace/studios/this_studio/final_folder_testing/original_pictures"

# Define the categories
categories = ["forehand", "backhand", "serve"]

# Function to display an image from each category
def show_images_from_categories(base_path, categories, parent_folder_path):
    fig, axes = plt.subplots(1, len(categories), figsize=(15, 5))
    
    for ax, category in zip(axes, categories):
        category_path = os.path.join(parent_folder_path, category)
        
        # Select an image from the category
        image_files = [f for f in os.listdir(category_path) if f.lower().endswith(('png', 'jpg', 'jpeg'))]
        
        if image_files:
            # Select the first image
            image_path = os.path.join(category_path, image_files[0])
            img = Image.open(image_path)
            
            ax.imshow(img)
            ax.set_title(category)
            ax.axis("off")
        else:
            ax.set_title(f"No images in '{category}'")
            ax.axis("off")
    
    plt.tight_layout()
    plt.show()

# Display images
show_images_from_categories(base_path, categories, parent_folder_path)
No description has been provided for this image

YOLOv5 für Image Cropping¶

  • YOLOv5 erkennt Personen in Tennisbildern.
  • Es identifiziert die Begrenzungsrahmen um die Spieler.
  • Anschliessend werden die Begrenzungsrahmen mit einem Rand zugeschnitten und auf eine einheitliche Zielgrösse skaliert.
In [1]:
def crop_and_resize_image(image_path: str, output_folder: str, base_folder: str, margin=0.5, target_size=(224, 224)):
    try:
        # Derive the relative path to retain folder structure
        relative_path = Path(image_path).relative_to(base_folder)  # e.g., backhand/B_001.jpeg
        new_file_path = Path(output_folder) / relative_path  # e.g., cropped_original/original_pictures/backhand/B_001.jpeg

        # Skip if the file already exists
        if new_file_path.exists():
            return True

        # Load the image
        image = Image.open(image_path)
        results = model(image_path)  # Run YOLO detection on the image

        # Filter detections for 'person' class
        person_detections = [det for det in results.xyxy[0].tolist() if int(det[5]) == 0]  # Class '0' is 'person'

        if not person_detections:
            print(f"No person detected in {image_path}")
            return False

        # Find the largest bounding box for the closest person
        closest_person = max(person_detections, key=lambda det: (det[2] - det[0]) * (det[3] - det[1]))

        # Extract bounding box coordinates
        xmin, ymin, xmax, ymax = map(int, closest_person[:4])

        # Calculate margin for consistent cropping
        img_width, img_height = image.size
        box_width = xmax - xmin
        box_height = ymax - ymin

        # Apply the margin to create a similar crop
        xmin = max(0, xmin - int(box_width * margin))
        ymin = max(0, ymin - int(box_height * margin))
        xmax = min(img_width, xmax + int(box_width * margin))
        ymax = min(img_height, ymax + int(box_height * margin))

        # Crop the image around the player
        cropped_image = image.crop((xmin, ymin, xmax, ymax))

        # Resize to a fixed target size for consistency
        resized_image = cropped_image.resize(target_size)

        new_file_path.parent.mkdir(parents=True, exist_ok=True)

        # Save the resized and cropped image
        resized_image.save(new_file_path)
        print(f"Saved cropped and resized image to {new_file_path}")
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return False
    return True
In [ ]:

Visualisierung der Bilder nach dem "cropping"¶

In [3]:
%matplotlib inline
In [2]:
import os
from PIL import Image
import matplotlib.pyplot as plt

# Define paths and categories
base_path = '/teamspace/studios/this_studio/final_folder/cropped/original_pictures'
categories = ['backhand', 'forehand', 'serve']

# Visualize a few samples from each category
def visualize_samples(base_path, categories):
    fig, axes = plt.subplots(1, len(categories), figsize=(15, 5))
    for i, category in enumerate(categories):
        folder_path = os.path.join(base_path, category)
        sample_image = os.listdir(folder_path)[0]  # Take the first image from each category
        img_path = os.path.join(folder_path, sample_image)
        image = Image.open(img_path)
        axes[i].imshow(image)
        axes[i].set_title(category)
        axes[i].axis("off")
    plt.tight_layout()
    plt.show()

visualize_samples(base_path, categories)
No description has been provided for this image

Transformieren und Splitting¶

  • Die Transformationen wie Resize und Normalize bereiten die Bilder für ein Modell (in unserem Fall ResNet) vor, indem sie in ein einheitliches Format gebracht werden.
  • Die Bilder werden zunächst skaliert, normalisiert und in Tensoren umgewandelt.
  • Die Labels basieren auf den drei Kategorien (Vorhand, Rückhand, Aufschlag) und werden in Trainings, Validierungs- und Test-Sets aufgeteilt.
  • Dank train_test_split wird eine stratifizierte Aufteilung durchgeführt, um die Klassenverteilung in jedem Datensatz beizubehalten.
In [3]:
from sklearn.model_selection import train_test_split
from torchvision import transforms

# Transformation pipeline
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load all image paths and labels
image_paths = []
labels = []
category_to_label = {category: idx for idx, category in enumerate(categories)}

for category in categories:
    folder_path = os.path.join(base_path, category)
    for file in os.listdir(folder_path):
        if file.endswith('.jpeg'):
            image_paths.append(os.path.join(folder_path, file))
            labels.append(category_to_label[category])

# Split data into train, validation, and test
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
    image_paths, labels, test_size=0.3, stratify=labels, random_state=42
)
val_paths, test_paths, val_labels, test_labels = train_test_split(
    temp_paths, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42
)

print(f"Train size: {len(train_paths)}, Validation size: {len(val_paths)}, Test size: {len(test_paths)}")
Train size: 1050, Validation size: 225, Test size: 225

Visualisierung der transformierten Bilder¶

  • Skalierung (Resize): Bilder werden auf eine einheitliche Größe von (224, 224) gebracht, um mit Modellen wie ResNet kompatibel zu sein.
  • Umwandlung in Tensoren (ToTensor): Bilder werden in PyTorch-Tensoren umgewandelt und Pixelwerte auf den Bereich [0, 1] normalisiert.
  • Normalisierung (Normalize): Farbkanäle der Bilder werden mit den Mittelwerten [0.485, 0.456, 0.406] und Standardabweichungen [0.229, 0.224, 0.225] standardisiert, um die Stabilität und Leistung zu verbessern.
In [4]:
from torch.utils.data import Dataset
import torch

class TennisDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# Visualize transformed images
def visualize_transformed_images(dataset, categories):
    fig, axes = plt.subplots(1, 5, figsize=(15, 5))
    for i in range(5):
        image, label = dataset[i]
        axes[i].imshow(image.permute(1, 2, 0))  # Convert CHW to HWC
        axes[i].set_title(categories[label])
        axes[i].axis("off")
    plt.tight_layout()
    plt.show()

train_dataset = TennisDataset(train_paths, train_labels, transform)
visualize_transformed_images(train_dataset, categories)
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
No description has been provided for this image

Modellaufbau¶

  • Ein vortrainiertes ResNet-18-Modell wird verwendet, um die Klassifikation der Tennisschläge durchzuführen. ResNet ist der Standard für Bildklassifikationen und eignet sich daher gut für diese Aufgabe. Modelle mit mehr Schichten, wie ResNet-50, wären für diese Aufgabe bereits zu komplex, während ein einfaches CNN-Netzwerk nicht komplex genug wäre.
  • Die letzte Schicht des Modells (fc) wird angepasst, um die Zielklassen (forehand, backhand, serve) zu unterstützen.

Datenaufbereitung¶

  • Der Datensatz wird in Trainings, Validierungs- und Testsätze unterteilt.
  • Die Daten werden mithilfe des DataLoader für das Training, die Validierung und das Testen bereitgestellt.

Modellkonfiguration¶

  • Loss-Funktion: CrossEntropyLoss wird verwendet, da sie für mehrklassige Klassifikationsprobleme geeignet ist.
  • Optimierer: Adam mit einer Lernrate von 0.001 sorgt für effizientes Training.
  • Testsatz: Ein Testsatz wird bereitgestellt, um die abschließende Bewertung des Modells auf ungesehenen Daten durchzuführen.
In [5]:
from torchvision import models
import torch.nn as nn
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

# Define the model, loss function, and optimizer
model = models.resnet18(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, len(categories))  # Adjust output layer for categories
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Split data into train, validation, and test sets
train_paths, temp_paths, train_labels, temp_labels = train_test_split(
    image_paths, labels, test_size=0.3, stratify=labels, random_state=42
)
val_paths, test_paths, val_labels, test_labels = train_test_split(
    temp_paths, temp_labels, test_size=0.5, stratify=temp_labels, random_state=42
)

# Create datasets
train_dataset = TennisDataset(train_paths, train_labels, transform)
val_dataset = TennisDataset(val_paths, val_labels, transform)
test_dataset = TennisDataset(test_paths, test_labels, transform)  # Added test dataset

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)  # Added test DataLoader

print("Model, datasets, and DataLoaders initialized successfully!")
/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torchvision/models/_utils.py:208: UserWarning: The parameter 'pretrained' is deprecated since 0.13 and may be removed in the future, please use 'weights' instead.
  warnings.warn(
/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torchvision/models/_utils.py:223: UserWarning: Arguments other than a weight enum or `None` for 'weights' are deprecated since 0.13 and may be removed in the future. The current behavior is equivalent to passing `weights=ResNet18_Weights.IMAGENET1K_V1`. You can also use `weights=ResNet18_Weights.DEFAULT` to get the most up-to-date weights.
  warnings.warn(msg)
Model, datasets, and DataLoaders initialized successfully!

Trainings- und Validierungsprozess mit Early Stopping¶

  • Initialisierung von Metriken: Listen für train_loss, val_loss, train_acc und val_acc speichern die Verlaufsdaten über die Epochen.

  • Training:

    • Trainingsphase:
      • Modell wird in den Trainingsmodus gesetzt (model.train()).
      • Schritte pro Batch:
        • Gradienten werden zurückgesetzt (optimizer.zero_grad()).
        • Eingaben werden durch das Modell geleitet, und der Verlust wird berechnet.
        • Gradienten werden berechnet (loss.backward()) und der Optimierer aktualisiert die Modellgewichte (optimizer.step()).
      • Laufende Berechnung von Trainingsverlust und -genauigkeit.
  • Validierung:

    • Validierungsphase:
      • Modell wird in den Evaluierungsmodus gesetzt (model.eval()).
      • Keine Gradientenberechnung (torch.no_grad()).
      • Laufende Berechnung von Validierungsverlust und -genauigkeit.
  • Early Stopping:

    • Überprüfung, ob der Validierungsverlust verbessert wird.
    • Falls keine Verbesserung über early_stop_patience Epochen, wird das Training gestoppt.
  • Ergebnisse pro Epoche:

    • Trainings- und Validierungsmetriken (Verlust und Genauigkeit) werden am Ende jeder Epoche ausgegeben.
In [8]:
# Initialize lists to store metrics
train_loss = []
val_loss = []
train_acc = []
val_acc = []

epochs = 10
early_stop_patience = 3  # Stop after 3 consecutive epochs without improvement
best_val_loss = float('inf')
early_stop_counter = 0

for epoch in range(epochs):
    # Training phase
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        # Calculate training accuracy
        _, predicted = torch.max(outputs, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

    # Store training metrics
    train_loss.append(running_loss / len(train_loader))
    train_acc.append(100 * correct_train / total_train)

    # Validation phase
    model.eval()
    running_val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_val_loss += loss.item()

            # Calculate validation accuracy
            _, predicted = torch.max(outputs, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()

    # Store validation metrics
    val_loss.append(running_val_loss / len(val_loader))
    val_acc.append(100 * correct_val / total_val)

    # Check for early stopping
    if val_loss[-1] < best_val_loss:
        best_val_loss = val_loss[-1]
        early_stop_counter = 0
    else:
        early_stop_counter += 1
        if early_stop_counter >= early_stop_patience:
            print(f"Early stopping triggered at epoch {epoch + 1}")
            break

    # Print metrics for the epoch
    print(f"Epoch {epoch + 1}/{epochs}")
    print(f"Train Loss: {train_loss[-1]:.4f}, Train Accuracy: {train_acc[-1]:.2f}%")
    print(f"Validation Loss: {val_loss[-1]:.4f}, Validation Accuracy: {val_acc[-1]:.2f}%")
Epoch 1/10
Train Loss: 0.3348, Train Accuracy: 87.43%
Validation Loss: 1.6032, Validation Accuracy: 67.11%
Epoch 2/10
Train Loss: 0.1306, Train Accuracy: 95.90%
Validation Loss: 0.0959, Validation Accuracy: 94.67%
Epoch 3/10
Train Loss: 0.0461, Train Accuracy: 98.29%
Validation Loss: 0.1900, Validation Accuracy: 92.44%
Epoch 4/10
Train Loss: 0.0335, Train Accuracy: 98.95%
Validation Loss: 0.1395, Validation Accuracy: 96.44%
Early stopping triggered at epoch 5

Ergebnis: Eine Validierungsgenauigkeit von 96.44 % konnte erreicht werden. Wir prüfen nun im Testergebnis, ob dies auch tatsächlich auf ungesehenen Daten funktioniert oder ob das Modell overfitted ist.

In [9]:
# Save the trained model
model_save_path = "/teamspace/studios/this_studio/final_folder_testing/trained_tennis_model.pth"  # Hardcoded path
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")
Model saved to /teamspace/studios/this_studio/final_folder_testing/trained_tennis_model.pth

Model wir gespeichert, damit dies später nochmals einfacher verwendet werden kann.

Modellbewertung¶

  • Das Modell wird mit den Testdaten evaluiert, um die Leistung auf neuen, ungesehenen Daten zu beurteilen.
  • Die Ergebnisse werden mithilfe einer Konfusionsmatrix visualisiert, um die Qualität der Klassifikation detailliert zu bewerten.
In [10]:
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

# Test evaluation
model.eval()
test_labels = []
test_preds = []

with torch.no_grad():
    for inputs, labels in test_loader:  # Use test_loader here
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_labels.extend(labels.numpy())
        test_preds.extend(predicted.numpy())

# Calculate test accuracy
correct = sum([1 for true, pred in zip(test_labels, test_preds) if true == pred])
total = len(test_labels)
test_accuracy = 100 * correct / total
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Confusion matrix for test set
test_cm = confusion_matrix(test_labels, test_preds)

# Heatmap with blue-red colormap
sns.heatmap(test_cm, annot=True, fmt="d", xticklabels=categories, yticklabels=categories, cmap="coolwarm")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Test Confusion Matrix")
plt.show()

# Classification report for test set
print("Test Set Classification Report")
print(classification_report(test_labels, test_preds, target_names=categories))
Test Accuracy: 96.89%
No description has been provided for this image
Test Set Classification Report
              precision    recall  f1-score   support

    backhand       0.99      0.96      0.97        75
    forehand       0.93      1.00      0.96        75
       serve       1.00      0.95      0.97        75

    accuracy                           0.97       225
   macro avg       0.97      0.97      0.97       225
weighted avg       0.97      0.97      0.97       225

Konklusion der Testmatrix und Ergebnisse¶

  • Die Vorhersage des Modells ist mit 97% insgesamt sehr gut.

  • Nur einige Vorhand- und Rückhand-Schläge wurden nicht korrekt klassifiziert.

  • Modellleistung: Das Modell zeigte bereits sehr gute Ergebnisse mit nur wenigen Epochen, weshalb der Fokus auf dem Hinzufügen und Vergleichen neuer Bilddateien liegt.

  • Kein Finetuning: Auf das Finetuning des ResNet-Modells wurde verzichtet, da bereits schnell eine hohe Genauigkeit erreicht wurde. Stattdessen wird geprüft, wie gut das Modell auf neue Daten generalisiert. Mein Projektpartner Simon Bieri führt parallel Finetuning durch.

  • Ziel: Überprüfung, ob das Modell, das auf den originalen Daten trainiert wurde, auch auf neue Daten zuverlässig generalisiert.

Hinzufügen von mehr Daten (eigene Daten)¶

  • Nächste Schritte: Im nächsten Abschnitt wird beschrieben, wie die neuen Bilddateien hinzugefügt wurden.

Dieser Code extrahiert Frames aus den angegebenen Video-Dateien (backhand.MOV, forehand.MOV, serve.MOV) und speichert sie in den entsprechenden Ordnern innerhalb des new_data-Verzeichnisses. Die Frames werden in regelmäßigen Abständen (alle 8 Frames, was sich als ideal erwiesen hat) gespeichert, wobei die ursprüngliche Ordnerstruktur beibehalten wird.

In [3]:
import os
import cv2
from pathlib import Path

# Absolute paths to video files
video_paths = {
    'backhand.MOV': '/teamspace/studios/this_studio/final_folder_testing/backhand.MOV',
    'forehand.MOV': '/teamspace/studios/this_studio/final_folder_testing/forehand.MOV',
    'serve.MOV': '/teamspace/studios/this_studio/final_folder_testing/serve.MOV'
}

# Base path for the new data folder (relative to the notebook directory or absolute URL path)
new_data_base_path = '/teamspace/studios/this_studio/final_folder_testing/new_data'

# Create directories to save frames in the new_data folder
output_folders = {
    'backhand_frames': Path(new_data_base_path) / 'backhand_frames',
    'forehand_frames': Path(new_data_base_path) / 'forehand_frames',
    'serve_frames': Path(new_data_base_path) / 'serve_frames'
}

# Create the directories if they don't exist
for folder in output_folders.values():
    os.makedirs(folder, exist_ok=True)

# Function to extract frames from a video
def extract_frames(video_path, output_folder, frame_interval=8):
    if not os.path.exists(video_path):
        print(f"Error: Video file {video_path} does not exist.")
        return

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Unable to open video file {video_path}.")
        return

    frame_count = 0
    saved_frame_count = 0
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Save frames at specific intervals
        if frame_count % frame_interval == 0:
            frame_filename = output_folder / f'frame_{saved_frame_count:04d}.jpg'
            
            # Skip saving if the file already exists
            if not frame_filename.exists():
                cv2.imwrite(str(frame_filename), frame)
                saved_frame_count += 1
        
        frame_count += 1
    
    cap.release()

# Extract frames for each video
for video_name, video_path in video_paths.items():
    output_folder = output_folders[video_name.split('.')[0] + '_frames']
    print(f"Processing {video_name}...")
    extract_frames(video_path, output_folder)

print("Frame extraction completed. Frames are stored in 'new_data'.")
Processing backhand.MOV...
Processing forehand.MOV...
Processing serve.MOV...
Frame extraction completed. Frames are stored in 'new_data'.

Video extract

In [ ]:

Verarbeitung von Videoaufnahmen¶

  • Wir verarbeiten Videoaufnahmen für verschiedene Tennisschlagarten:
    • Vorhand, Rückhand und Aufschlag.
  • Die Schlagarten werden in einem separaten Textfile gelabelt, um die Datenstruktur klar zu definieren.
  • Zur Beschleunigung des Prozesses verwenden wir Sequenzen von 5 Bildern, um Informationen effizient zu extrahieren und zu speichern.
In [4]:
import os
import shutil
from pathlib import Path
import logging
from logging import StreamHandler

# Configure logging
log_level = logging.INFO
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
logging_formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(name)-15s %(message)s",
                                      datefmt="%Y-%m-%d %H:%M:%S")
stream_handler = StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(logging_formatter)
logger.addHandler(stream_handler)

####################### INPUT DEFINITIONS ##############################################################################
# Base folder paths (updated to the new directory)
base_folder = Path("/teamspace/studios/this_studio/final_folder_testing")
image_source_paths = {
    "forehand": base_folder.joinpath("new_data/forehand_frames"),
    "backhand": base_folder.joinpath("new_data/backhand_frames"),
    "serve": base_folder.joinpath("new_data/serve_frames")
}
image_target_path = base_folder.joinpath("processed_images")
index_file_map = {
    "forehand": base_folder.joinpath("F_indexes.txt"),
    "backhand": base_folder.joinpath("B_indexes.txt"),
    "serve": base_folder.joinpath("S_indexes.txt"),
}

# General settings
num_files_per_sequence = 5

##################### END INPUT DEFINITIONS ############################################################################

# Main logic for processing all hit types
logger.info("Start script")
try:
    for hit_type, source_path in image_source_paths.items():
        # Validate hit_type
        if hit_type not in index_file_map:
            logger.warning(f"No index file found for {hit_type}. Skipping...")
            continue

        # File prefix based on hit_type
        file_prefix = hit_type[0].upper()

        # Load indexes
        index_file_path = index_file_map[hit_type]
        if not index_file_path.exists():
            logger.warning(f"Index file {index_file_path} does not exist. Skipping {hit_type}...")
            continue

        with open(index_file_path, "r") as f:
            lines = f.readlines()
            indexes = [int(x.strip("\n")) for x in lines if len(x.strip("\n")) > 0]

        # Ensure the target directory exists
        target_path = image_target_path.joinpath(hit_type)
        os.makedirs(target_path, exist_ok=True)

        # Process frames
        for sequence_index, file_index in enumerate(indexes):
            for seq_file_index, i in enumerate(range(file_index - num_files_per_sequence + 1, file_index + 1)):
                # Adjusted to match your filename pattern (e.g., frame_XXXX.jpg)
                source_file = source_path.joinpath(f"frame_{i:04}.jpg")
                target_file = target_path.joinpath(f"{file_prefix}_{sequence_index:03}_{seq_file_index:03}.jpeg")

                if source_file.exists():
                    shutil.copyfile(source_file, target_file)
                else:
                    logger.warning(f"Source file {source_file} does not exist and was skipped.")

        logger.info(f"Finished processing {hit_type}. {len(indexes)} sequences copied to {target_path}")

except Exception as e:
    logger.exception("Exception occurred", exc_info=True)
finally:
    logger.info("Script completed.")
2024-11-27 06:35:53 INFO     __main__        Start script
2024-11-27 06:35:53 INFO     __main__        Finished processing forehand. 27 sequences copied to /teamspace/studios/this_studio/final_folder_testing/processed_images/forehand
2024-11-27 06:35:53 INFO     __main__        Finished processing backhand. 51 sequences copied to /teamspace/studios/this_studio/final_folder_testing/processed_images/backhand
2024-11-27 06:35:53 INFO     __main__        Finished processing serve. 43 sequences copied to /teamspace/studios/this_studio/final_folder_testing/processed_images/serve
2024-11-27 06:35:53 INFO     __main__        Script completed.

Visualisierung der Sequenz¶

In [13]:
import matplotlib.pyplot as plt
from pathlib import Path

# Function to visualize a sequence of 5 images
def visualize_sequence(sequence_folder, sequence_prefix, num_images=5):
    images = sorted(sequence_folder.glob(f"{sequence_prefix}_*.jpeg"))
    if len(images) < num_images:
        print(f"Not enough images in {sequence_folder} for sequence {sequence_prefix}")
        return

    fig, axes = plt.subplots(1, num_images, figsize=(15, 5))
    for i, image_path in enumerate(images[:num_images]):
        img = plt.imread(image_path)
        axes[i].imshow(img)
        axes[i].set_title(image_path.name)
        axes[i].axis("off")

    plt.tight_layout()
    plt.show()

# Example usage with relative paths based on current working directory
current_dir = Path.cwd()  # Assuming script is running from the base folder location
processed_folder = current_dir / "final_folder_testing/processed_images"  # Updated path
hit_type = "forehand"  # Change to "backhand" or "serve" as needed
sequence_folder = processed_folder / hit_type  # Path to the folder containing sequences
sequence_prefix = "F_000"  # Adjust the prefix to match a specific sequence

# Display the first sequence
visualize_sequence(sequence_folder, sequence_prefix)
No description has been provided for this image
  • In einem separaten Textfile erfolgt die letzte Labelung, kurz vor der Abgabe des Projekts.
  • Aus jeder Sequenz werden 5 Bilder ausgewählt, um nur relevante Bilder für die Verarbeitung und Analyse zu extrahieren.
In [ ]:

Cropping¶

Im nächsten Schritt werden, wie zu Beginn bei den originalen Bildern, mithilfe von YOLOv5 die Fotos mit Bounding Boxes ausgeschnitten. Dies ermöglicht einen Vergleich zwischen verschiedenen Datensätzen.

In [5]:
import torch
from pathlib import Path
from PIL import Image
import os

# Load the YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)

def crop_and_resize_image(image_path: str, output_folder: str, base_folder: str, margin=0.5, target_size=(224, 224)):
    try:
        # Derive the relative path to retain folder structure
        relative_path = Path(image_path).relative_to(base_folder)  # e.g., backhand/B_001.jpeg
        new_file_path = Path(output_folder) / relative_path  # e.g., cropped_original/original_pictures/backhand/B_001.jpeg

        # Skip if the file already exists
        if new_file_path.exists():
            return True

        # Load the image
        image = Image.open(image_path)
        results = model(image_path)  # Run YOLO detection on the image

        # Filter detections for 'person' class
        person_detections = [det for det in results.xyxy[0].tolist() if int(det[5]) == 0]  # Class '0' is 'person'

        if not person_detections:
            return False

        # Find the largest bounding box for the closest person
        closest_person = max(person_detections, key=lambda det: (det[2] - det[0]) * (det[3] - det[1]))

        # Extract bounding box coordinates
        xmin, ymin, xmax, ymax = map(int, closest_person[:4])

        # Calculate margin for consistent cropping
        img_width, img_height = image.size
        box_width = xmax - xmin
        box_height = ymax - ymin

        # Apply the margin to create a similar crop
        xmin = max(0, xmin - int(box_width * margin))
        ymin = max(0, ymin - int(box_height * margin))
        xmax = min(img_width, xmax + int(box_width * margin))
        ymax = min(img_height, ymax + int(box_height * margin))

        # Crop the image around the player
        cropped_image = image.crop((xmin, ymin, xmax, ymax))

        # Resize to a fixed target size for consistency
        resized_image = cropped_image.resize(target_size)

        # Ensure the output folder exists
        new_file_path.parent.mkdir(parents=True, exist_ok=True)

        # Save the resized and cropped image
        resized_image.save(new_file_path)
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return False
    return True

# Define paths for the dataset (relative paths)
datasets = {
    "original_pictures": "/teamspace/studios/this_studio/final_folder_testing/processed_images",  # Full path
}

# Output base path (new folder: cropped_original)
output_base_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_processed"  # Specify the new folder here

# Process each dataset with consistent cropping and resizing
failed_paths = []
for dataset_name, dataset_path in datasets.items():
    # Get all image files recursively within the current directory
    all_file_paths = list(Path(dataset_path).rglob("*.jpeg"))  # Get all JPEG files recursively

    # Specify the correct output subpath (relative path)
    dataset_output_path = Path(output_base_path) / dataset_name  # e.g., cropped_original/original_pictures

    for file_path in all_file_paths:
        success = crop_and_resize_image(
            image_path=file_path.as_posix(),
            output_folder=dataset_output_path.as_posix(),
            base_folder=dataset_path,
            margin=0.5,
            target_size=(500, 500)
        )
        if not success:
            failed_paths.append(file_path)

# Output any failed images
if failed_paths:
    print("Failed Images:")
    for failed_path in failed_paths:
        print(failed_path)
Using cache found in /home/zeus/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2024-11-12 Python-3.10.10 torch-2.2.1+cu121 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 

Dieser Code verschiebt Bilddateien aus einem Quellordner (original_pictures) in neue Zielordner, die basierend auf den Kategorien (backhand, forehand, serve) organisiert sind. Fehlende Zielordner werden automatisch erstellt. Abschliessend werden leere Quellordner gelöscht, falls keine Dateien mehr enthalten sind.

In [6]:
import os
from pathlib import Path
import shutil

# Pfad zum aktuellen Verzeichnis mit 'original_pictures'
source_base_path = Path("/teamspace/studios/this_studio/final_folder_testing/cropped_processed/original_pictures")

# Neuer Basispfad ohne 'original_pictures'
target_base_path = Path("/teamspace/studios/this_studio/final_folder_testing/cropped_processed")

# Alle Kategorien (Ordner wie 'backhand', 'forehand', 'serve')
categories = ["backhand", "forehand", "serve"]

# Bilder verschieben
if source_base_path.exists():
    for category in categories:
        source_path = source_base_path / category
        target_path = target_base_path / category

        # Skip if source path does not exist
        if not source_path.exists():
            continue

        # Zielordner erstellen, falls nicht vorhanden
        os.makedirs(target_path, exist_ok=True)

        # Alle Dateien im Quellordner verschieben
        for file in source_path.iterdir():
            if file.is_file() and file.suffix.lower() in [".jpeg", ".jpg", ".png"]:  # Nur Bilddateien verschieben
                target_file = target_path / file.name
                shutil.move(str(file), str(target_file))  # Datei verschieben

    # Optional: Leere Verzeichnisse entfernen
    for category in categories:
        source_path = source_base_path / category
        if source_path.exists() and not any(source_path.iterdir()):
            source_path.rmdir()

else:
    print(f"Source base path does not exist: {source_base_path}. Nothing to do.")

Visualisierung der gecroppten Bilder vom eigenen Datensatz¶

In [2]:
import os
from PIL import Image
import matplotlib.pyplot as plt

# Define paths and categories
base_path = '/teamspace/studios/this_studio/final_folder_testing/cropped_processed'

categories = ['backhand', 'forehand', 'serve']

# Visualize a few samples from each category
def visualize_samples(base_path, categories):
    fig, axes = plt.subplots(1, len(categories), figsize=(15, 5))
    for i, category in enumerate(categories):
        folder_path = os.path.join(base_path, category)
        sample_image = os.listdir(folder_path)[0]  # Take the first image from each category
        img_path = os.path.join(folder_path, sample_image)
        image = Image.open(img_path)
        axes[i].imshow(image)
        axes[i].set_title(category)
        axes[i].axis("off")
    plt.tight_layout()
    plt.show()

visualize_samples(base_path, categories)
No description has been provided for this image

Test des Modells auf dem zweiten Datensatz¶

Zunächst mit dem ResNet-18-Modell, das am ersten Datensatz trainiert wurde.¶

In [16]:
import os
from pathlib import Path
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
from PIL import Image

# Define the second dataset's base path
second_dataset_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_processed"

# Categories and label mapping
categories = ['forehand', 'backhand', 'serve']
category_to_label = {category: idx for idx, category in enumerate(categories)}

# Load all image paths and labels from the second dataset
image_paths = []
labels = []

for category in categories:
    folder_path = Path(second_dataset_path) / category
    if not folder_path.exists():
        print(f"Warning: {folder_path} does not exist. Skipping category {category}.")
        continue
    for file in folder_path.glob("*.jpeg"):  # Match JPEG files
        image_paths.append(str(file))
        labels.append(category_to_label[category])

# Define the custom dataset class
class TennisDataset(torch.utils.data.Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label

# Define transformations for the second dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Match the input size expected by the model
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Same normalization as during training
])

# Create the second dataset and DataLoader
second_dataset = TennisDataset(image_paths, labels, transform)
second_loader = DataLoader(second_dataset, batch_size=32, shuffle=False)

# Load the trained model from the hardcoded path
model_save_path = "/teamspace/studios/this_studio/final_folder_testing/trained_tennis_model.pth"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Assuming you're using a ResNet18 model
from torchvision import models
import torch.nn as nn

# Recreate the model architecture and load weights
model = models.resnet18(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, len(categories))  # Adjust output layer for categories
model.load_state_dict(torch.load(model_save_path, map_location=device))
model.to(device)
model.eval()

# Evaluate the model on the second dataset
second_test_labels = []
second_test_preds = []

with torch.no_grad():
    for inputs, labels in second_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        second_test_labels.extend(labels.cpu().numpy().tolist())  # Flatten labels
        second_test_preds.extend(predicted.cpu().numpy().tolist())  # Flatten predictions

# Calculate test accuracy for the second dataset
correct = sum(1 for true, pred in zip(second_test_labels, second_test_preds) if true == pred)
total = len(second_test_labels)
test_accuracy = 100 * correct / total
print(f"Second Dataset Test Accuracy: {test_accuracy:.2f}%")

# Confusion matrix for the second dataset
second_test_cm = confusion_matrix(second_test_labels, second_test_preds)

# Heatmap visualization
sns.heatmap(second_test_cm, annot=True, fmt="d", xticklabels=categories, yticklabels=categories, cmap="coolwarm")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Second Dataset Confusion Matrix")
plt.show()

# Classification report for the second dataset
print("Second Dataset Classification Report")
print(classification_report(second_test_labels, second_test_preds, target_names=categories))
Second Dataset Test Accuracy: 25.29%
No description has been provided for this image
Second Dataset Classification Report
              precision    recall  f1-score   support

    forehand       0.15      0.41      0.22       135
    backhand       0.20      0.14      0.16       255
       serve       0.98      0.29      0.45       215

    accuracy                           0.25       605
   macro avg       0.44      0.28      0.28       605
weighted avg       0.47      0.25      0.28       605

Leider betrug die weighted accuracy nur 28 Prozent und lag damit sogar unter der zufälligen Trefferwahrscheinlichkeit. Gibt es eine Möglichkeit, die Prediction zu verbessern, ohne die Daten des zweiten Datensatzes für das Training zu verwenden? Daher wähle ich den Ansatz der Keypoint Detection, der im folgenden Abschnitt angewendet wird.

Mit Keypoint Detection¶

Beschreibung des Codes¶

Als nächster Schritt verwenden wir MediaPipe Pose, um menschliche Keypoints zu erkennen und zu visualisieren. Die erkannten Keypoints und Verbindungen werden auf die Originalbilder „überlagert“. Nachfolgend zwei Beispielbilder mit den Keypoints.

In [6]:
import os
import cv2
import mediapipe as mp
import matplotlib.pyplot as plt

# Paths to the images (ensure these paths exist and images are accessible)
img_paths = [
    "/teamspace/studios/this_studio/final_folder_testing/cropped_original_pictures/original_pictures/backhand/B_001.jpeg",  # Image 1
    "/teamspace/studios/this_studio/final_folder_testing/cropped_processed/forehand/F_001_000.jpeg"  # Image 2
]

# Initialize MediaPipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Create a figure for displaying both images
fig, axes = plt.subplots(1, 2, figsize=(20, 10))

# Loop through each image
for i, img_path in enumerate(img_paths):
    if not os.path.exists(img_path):
        print(f"Image not found: {img_path}")
        continue

    # Read the image
    image = cv2.imread(img_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Perform keypoint detection
    results = pose.process(image_rgb)

    # Create a copy of the image to overlay keypoints
    image_with_keypoints = image.copy()

    if results.pose_landmarks:
        # Loop through each landmark and draw it on the image
        for landmark in results.pose_landmarks.landmark:
            # Convert normalized coordinates to pixel coordinates
            height, width, _ = image.shape
            x = int(landmark.x * width)
            y = int(landmark.y * height)

            # Draw the keypoint
            cv2.circle(image_with_keypoints, (x, y), 5, (0, 255, 0), -1)

        # Draw the connections between keypoints
        mp.solutions.drawing_utils.draw_landmarks(image_with_keypoints, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    # Convert BGR to RGB for displaying with matplotlib
    image_with_keypoints_rgb = cv2.cvtColor(image_with_keypoints, cv2.COLOR_BGR2RGB)

    # Display the image with keypoints
    axes[i].imshow(image_with_keypoints_rgb)
    axes[i].axis('off')
    axes[i].set_title(f"Image {i+1} with Keypoints")

plt.tight_layout()
plt.show()
2024-11-25 08:09:40.241518: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-11-25 08:09:40.650878: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
E0000 00:00:1732522180.812181    9814 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1732522180.857255    9814 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-25 08:09:41.306645: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 AVX512_FP16 AVX_VNNI AMX_TILE AMX_INT8 AMX_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
W0000 00:00:1732522185.911606   52387 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1732522185.953056   52387 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1732522185.983218   52385 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
No description has been provided for this image
In [ ]:

Als nächster Schritt wird mit MediaPipe Pose Keypoints auf Bildern in den Kategorien forehand, backhand und serve erkannt und die Koordinaten [x, y, z, visibility] jeder Erkennung in einer Log-Datei gespeichert. Bilder, bei denen keine Keypoints erkannt werden oder die nicht lesbar sind, werden ebenfalls mit entsprechenden Hinweisen protokolliert.

In [7]:
import os
import mediapipe as mp
import cv2

# Paths
input_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_original_pictures/original_pictures"
output_log = "/teamspace/studios/this_studio/final_folder_testing/keypoints_log_original.txt"

# Initialize MediaPipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Create a text file to log keypoints
with open(output_log, "w") as log_file:
    # Loop through each category folder
    for category in ["forehand", "backhand", "serve"]:
        category_path = os.path.join(input_path, category)
        if not os.path.exists(category_path):
            print(f"Warning: Category folder {category_path} does not exist. Skipping...")
            continue  # Skip if category folder does not exist

        # Loop through each image in the category
        for img_name in os.listdir(category_path):
            img_path = os.path.join(category_path, img_name)
            if not img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                print(f"Skipping non-image file: {img_name}")
                continue  # Skip non-image files

            try:
                # Read the image
                image = cv2.imread(img_path)
                if image is None:
                    print(f"Error reading image: {img_path}. Skipping...")
                    log_file.write(f"Image: {img_path}\nError: Unable to read image.\n\n")
                    continue

                image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                # Perform keypoint detection
                results = pose.process(image_rgb)

                if results.pose_landmarks:
                    # Extract keypoints as a list of [x, y, z, visibility]
                    keypoints = [
                        [lm.x, lm.y, lm.z, lm.visibility]
                        for lm in results.pose_landmarks.landmark
                    ]

                    # Write to log file
                    log_file.write(f"Image: {img_path}\n")
                    log_file.write(f"Keypoints detected (raw data): {keypoints}\n\n")
                else:
                    log_file.write(f"Image: {img_path}\n")
                    log_file.write("Keypoints detected (raw data): []\n\n")
            except Exception as e:
                log_file.write(f"Image: {img_path}\nError: {e}\n\n")
                print(f"Error processing {img_path}: {e}")

pose.close()
print(f"Keypoint detection completed. Results saved to {output_log}")
W0000 00:00:1732522191.816074   52518 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1732522191.849772   52517 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
Keypoint detection completed. Results saved to /teamspace/studios/this_studio/final_folder_testing/keypoints_log_original.txt

Beispiel eines Logs:¶

image.png

In [ ]:

Beispiel für Keypoint-Daten (Landmarks)¶

Die folgenden Daten repräsentieren Landmarks, die von einem Pose-Estimationsmodell erkannt wurden. Jeder Landmark enthält 4 Werte:

  1. x: Horizontale Koordinate (relativ zur Bildbreite, normalisiert zwischen 0–1).
  2. y: Vertikale Koordinate (relativ zur Bildhöhe, normalisiert zwischen 0–1).
  3. z: Tiefenkoordinate (relativ zur Kamera, normalisiert).
  4. visibility: Zuverlässigkeitswert (0–1), der angibt, wie wahrscheinlich der Landmark sichtbar ist.

Beispieldaten:¶

plaintext
[
    [0.5, 0.6, -0.1, 0.98],  # Nase (`x`, `y`, `z`, `visibility`)
    [0.48, 0.55, -0.1, 0.95],  # Linkes Auge
    [0.52, 0.55, -0.1, 0.96],  # Rechtes Auge
    ...
    [0.4, 0.8, 0.1, 0.85]  # Linker Knöchel
]

In [ ]:

Trainieren und Testen anhand der Keypoints aus der Log-Datei¶

In diesem Teil werden Keypoint-Daten aus der zuvor erstellten Log-Datei geladen, zu Tensoren verarbeitet und ein Multi-Layer Perceptron (MLP) trainiert, um Tennisschläge (forehand, backhand, serve) zu klassifizieren. Die Trainings- und Validierungsverluste über die Epochen werden visualisiert und die Modellgenauigkeit auf einem separaten Testdatensatz bewertet.

In [8]:
import re
import torch
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from ast import literal_eval

# Base path for the project
base_path = "/teamspace/studios/this_studio/final_folder_testing"

# Log file path
log_file = f"{base_path}/keypoints_log_original.txt"

# Define a dictionary to map shot types to labels
shot_type_to_label = {"forehand": 0, "backhand": 1, "serve": 2}

# Initialize lists to store keypoints and labels
keypoints_data = []
labels = []

# Counters for skipped and valid entries
skipped_count = 0
valid_count = 0

# Define a regex pattern to extract shot type from file path
shot_type_pattern = re.compile(r"(forehand|backhand|serve)")

# Expected keypoints length (33 landmarks * 4 values per landmark)
expected_keypoints_length = 33 * 4

# Parse the log file
with open(log_file, "r") as f:
    lines = f.readlines()

for line in lines:
    # Find the image path and determine shot type
    if line.startswith("Image:"):
        match = shot_type_pattern.search(line)
        if match:
            current_label = shot_type_to_label[match.group(0)]
    # Find keypoints data
    elif line.startswith("Keypoints detected (raw data):"):
        keypoints_str = line.split(":")[1].strip()
        keypoints_list = literal_eval(keypoints_str)  # Safer alternative to eval()
        flattened_keypoints = [value for kp in keypoints_list for value in kp]  # Flatten keypoints
        
        # Ensure keypoints have the correct length before appending
        if len(flattened_keypoints) == expected_keypoints_length:
            keypoints_data.append(flattened_keypoints)
            labels.append(current_label)
            valid_count += 1  # Increment valid count
        else:
            skipped_count += 1  # Increment skipped count

# Print the counts for skipped and valid entries
print(f"Valid entries processed: {valid_count}")
print(f"Skipped entries: {skipped_count}")

# Convert to tensors
X = torch.tensor(keypoints_data, dtype=torch.float32)
y = torch.tensor(labels, dtype=torch.long)

# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create DataLoaders for PyTorch
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Check if CUDA is available and select device accordingly
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Define an MLP model for classification
class KeypointClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(KeypointClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Set up the model
input_dim = X_train.shape[1]
num_classes = len(shot_type_to_label)
model = KeypointClassifier(input_dim, num_classes).to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 20
train_losses = []
val_losses = []

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    
    train_losses.append(running_loss / len(train_loader))

    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()
    
    val_loss /= len(test_loader)
    val_losses.append(val_loss)

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_losses[-1]:.4f}, Validation Loss: {val_loss:.4f}")

# Evaluate the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()

accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")

# Plot training and validation loss
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label="Training Loss")
plt.plot(val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.title("Training and Validation Loss Over Epochs")
plt.show()
Valid entries processed: 1370
Skipped entries: 130
Using device: cpu
Epoch 1/20, Train Loss: 1.0031, Validation Loss: 0.7909
Epoch 2/20, Train Loss: 0.7056, Validation Loss: 0.5413
Epoch 3/20, Train Loss: 0.5571, Validation Loss: 0.4576
Epoch 4/20, Train Loss: 0.4765, Validation Loss: 0.4111
Epoch 5/20, Train Loss: 0.4202, Validation Loss: 0.3676
Epoch 6/20, Train Loss: 0.3899, Validation Loss: 0.3646
Epoch 7/20, Train Loss: 0.3555, Validation Loss: 0.3701
Epoch 8/20, Train Loss: 0.3731, Validation Loss: 0.3680
Epoch 9/20, Train Loss: 0.3648, Validation Loss: 0.3385
Epoch 10/20, Train Loss: 0.3389, Validation Loss: 0.3109
Epoch 11/20, Train Loss: 0.3243, Validation Loss: 0.3345
Epoch 12/20, Train Loss: 0.3204, Validation Loss: 0.2952
Epoch 13/20, Train Loss: 0.3066, Validation Loss: 0.2925
Epoch 14/20, Train Loss: 0.3133, Validation Loss: 0.2883
Epoch 15/20, Train Loss: 0.2848, Validation Loss: 0.2797
Epoch 16/20, Train Loss: 0.2920, Validation Loss: 0.2636
Epoch 17/20, Train Loss: 0.2820, Validation Loss: 0.3113
Epoch 18/20, Train Loss: 0.2827, Validation Loss: 0.2539
Epoch 19/20, Train Loss: 0.2611, Validation Loss: 0.2737
Epoch 20/20, Train Loss: 0.2661, Validation Loss: 0.2466
Test Accuracy: 90.51%
No description has been provided for this image

87% als Test Accuracy ist zwar schlechter als das ResNet-Modell, aber wir werden sehen, wie gut es auf das zweite Dataset angewendet werden kann.

Anwenden des trainierten Modells am zweiten Datensatz¶

Nachfolgend wird nochmals MediaPipe Pose verwendet, um Keypoints aus den zugeschnittenen Bildern in den Kategorien forehand, backhand und serve zu extrahieren und in einer Log-Datei zu speichern. Dabei werden jetzt die neuen, selbst erstellten Bilder verwendet, damit das trainierte Modell auf dem zweiten Datensatz getestet werden kann.

In [9]:
import os
import mediapipe as mp
import cv2

# Updated Paths
input_path = "/teamspace/studios/this_studio/final_folder_testing/cropped_processed"
output_log = "/teamspace/studios/this_studio/final_folder_testing/processed_keypoints_log.txt"

# Initialize MediaPipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Create a text file to log keypoints
with open(output_log, "w") as log_file:
    # Loop through each category folder
    for category in ["forehand", "backhand", "serve"]:
        category_path = os.path.join(input_path, category)
        if not os.path.exists(category_path):
            print(f"Warning: Category folder {category_path} does not exist. Skipping...")
            continue  # Skip if category folder does not exist

        # Loop through each image in the category
        for img_name in os.listdir(category_path):
            img_path = os.path.join(category_path, img_name)
            if not img_name.lower().endswith(('.jpg', '.jpeg', '.png')):
                print(f"Skipping non-image file: {img_name}")
                continue  # Skip non-image files

            try:
                # Read the image
                image = cv2.imread(img_path)
                if image is None:
                    print(f"Error reading image: {img_path}. Skipping...")
                    log_file.write(f"Image: {img_path}\nError: Unable to read image.\n\n")
                    continue

                image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                # Perform keypoint detection
                results = pose.process(image_rgb)

                if results.pose_landmarks:
                    # Extract keypoints as a list of [x, y, z, visibility]
                    keypoints = [
                        [lm.x, lm.y, lm.z, lm.visibility]
                        for lm in results.pose_landmarks.landmark
                    ]

                    # Write to log file
                    log_file.write(f"Image: {img_path}\n")
                    log_file.write(f"Keypoints detected (raw data): {keypoints}\n\n")
                else:
                    log_file.write(f"Image: {img_path}\n")
                    log_file.write("Keypoints detected (raw data): []\n\n")
            except Exception as e:
                print(f"Error processing {img_path}: {e}")
                log_file.write(f"Image: {img_path}\nError: {e}\n\n")

print(f"Keypoint detection completed. Results saved to {output_log}")
W0000 00:00:1732522234.091883   53286 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1732522234.139160   53286 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
Keypoint detection completed. Results saved to /teamspace/studios/this_studio/final_folder_testing/processed_keypoints_log.txt

Anwendung des trainierten Modells auf den zweiten Testdatensatz¶

Das trainierte Klassifikationsmodell vom ersten Testset mit den Keypoints aus der Logdatei wird auf einen zweiten Testdatensatz angewendet. Dabei werden vorverarbeitete Keypoint-Daten (33 Landmarks mit 4 Werten) geladen und mit den Labels (forehand, backhand, serve) verglichen.

In [10]:
import re
import torch
from torch.utils.data import DataLoader, TensorDataset
from ast import literal_eval
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

# Path to the processed keypoints log file
test_log_file = "/teamspace/studios/this_studio/final_folder_testing/processed_keypoints_log.txt"

# Define a dictionary to map shot types to labels
shot_type_to_label = {"forehand": 0, "backhand": 1, "serve": 2}

# Initialize lists to store keypoints and labels for testing
test_keypoints_data = []
test_labels = []

# Define a regex pattern to extract shot type from file path
shot_type_pattern = re.compile(r"(forehand|backhand|serve)")

# Expected keypoints length (33 landmarks * 4 values per landmark)
expected_keypoints_length = 33 * 4

# Parse the test log file
with open(test_log_file, "r") as f:
    lines = f.readlines()

skipped_count = 0
valid_count = 0

for line in lines:
    # Find the image path and determine shot type
    if line.startswith("Image:"):
        match = shot_type_pattern.search(line)
        if match:
            current_label = shot_type_to_label[match.group(0)]
    # Find keypoints data
    elif line.startswith("Keypoints detected (raw data):"):
        keypoints_str = line.split(":")[1].strip()
        keypoints_list = literal_eval(keypoints_str)  # Safer alternative to eval()
        flattened_keypoints = [value for kp in keypoints_list for value in kp]  # Flatten keypoints

        # Ensure keypoints have the correct length before appending
        if len(flattened_keypoints) == expected_keypoints_length:
            test_keypoints_data.append(flattened_keypoints)
            test_labels.append(current_label)
            valid_count += 1
        else:
            skipped_count += 1

# Print the counts for skipped and valid entries
print(f"Testing: Valid entries processed: {valid_count}, Skipped entries: {skipped_count}")

# Convert to tensors
X_test = torch.tensor(test_keypoints_data, dtype=torch.float32)
y_test = torch.tensor(test_labels, dtype=torch.long)

# Create DataLoader for PyTorch
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Ensure your model is on the correct device
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

# Evaluate the model on the test data
model.eval()
correct = 0
total = 0
all_preds = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

accuracy = correct / total
print(f"Test Accuracy: {accuracy * 100:.2f}%")

# Plot confusion matrix
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(shot_type_to_label.keys()))
disp.plot(cmap="coolwarm")
plt.title("Confusion Matrix for Test Set")
plt.show()
Testing: Valid entries processed: 554, Skipped entries: 51
Test Accuracy: 86.82%
No description has been provided for this image
In [11]:
from sklearn.metrics import f1_score

# Calculate F1 score
f1 = f1_score(all_labels, all_preds, average='weighted')  # Use 'weighted' for imbalanced datasets
print(f"F1 Score (Weighted): {f1:.2f}")
F1 Score (Weighted): 0.87

Der Vergleich zwischen den beiden Datensätzen hat sehr gut funktioniert! Die Accuracy konnte von 25 % auf 87 % erhöht werden. Die Schläge wurden mithilfe des trainierten Modells vom ersten Keypoint-Textfile klassifiziert und erfolgreich auf die ungesehenen Daten des zweiten Datensatzes angewendet – ein grosser Erfolg!

Gesamtanalyse der Ergebnisse¶

Das Modell ResNet-18 erzielte hervorragende Ergebnisse, wenn es auf ein einziges Dataset trainiert und getestet wurde, mit einer Accuracy von 86 %. Allerdings verschlechterte sich die Leistung drastisch, sobald ein zweites Dataset für Tests hinzugefügt wurde, mit einer Leistung schlechter als zufällige Chance. Eine mögliche Lösung besteht darin, noch mehr Bilder hinzuzufügen und die Datasets gemeinsam zu trainieren, um die Generalisierung auf ungesehene Daten zu verbessern, wobei die Erhebung von weiteren Daten für weitere Datensätze zu intensiv gewesen wäre im Rahmen dieses Projekts.

Die Keypoint-basierte Methode zeigte eine starke Performance und erreichte eine Accuracy von 86 % zwischen zwei Datasets. Beeindruckend ist, dass nur Keypoint-Informationen aus Textdateien verwendet wurden. Die Ergebnisse zeigen, dass die Keypoint-Methode grosses Potenzial für Generalisierungsprobleme aufweist, während ResNet-18 durch die Integration zusätzlicher Daten weiter optimiert werden könnte.